package com.sugo.seckill.queue.mq;


import com.alibaba.fastjson.JSON;
import com.fasterxml.jackson.databind.ser.Serializers;
import com.sugo.seckill.error.BaseException;
import com.sugo.seckill.http.HttpStatus;
import com.sugo.seckill.mapper.order.SeckillGoodsMapper;
import com.sugo.seckill.order.service.SeckillOrderService;
import com.sugo.seckill.pojo.TbSeckillGoods;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.redisson.misc.Hash;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.Map;

@Component
public class MqProducer {

    private static final String producerGroup = "seckillGroup";
    private static final String namesrvAddr = "172.17.61.90:9876";
    //private DefaultMQProducer producer;
    private TransactionMQProducer producer;


    // 库存回补
    @Autowired
    private RedisTemplate redisTemplate;

    //注入订单服务
    @Autowired
    private SeckillOrderService orderService;

    //注入
    @Autowired
    private SeckillGoodsMapper seckillGoodsMapper;

    @PostConstruct
    public void initProducer() {
        producer = new TransactionMQProducer(producerGroup);
        producer.setNamesrvAddr(namesrvAddr);
        producer.setRetryTimesWhenSendFailed(3);
        try {
            producer.start();


            //事务消息方法
            producer.setTransactionListener(new TransactionListener() {

                // 下单的业务的在此处进行
                @Override
                public LocalTransactionState executeLocalTransaction(Message message, Object o) {

                    //获取消息对象
                    String seckillId = null;
                    String userId = null;
                    try {
                       String jsonStr = new String(message.getBody(), RemotingHelper.DEFAULT_CHARSET);

                        Map<String,String> maps = JSON.parseObject(jsonStr, Map.class);

                        //把变成一个对象
                        //Map<String,Object> maps  = (Map<String, Object>) JSON.toJSON(jsonStr);

                        //获取数据
                        seckillId = (String) maps.get("seckillId");
                        //获取userid
                        userId = (String) maps.get("userId");

                        //下单业务处理
                        orderService.startSubmitOrderByRedis(Long.parseLong(seckillId),userId);

                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }catch (BaseException e){

                        // 消息发送失败了
                        if(e.getCode() == HttpStatus.SC_METHOD_FAILURE){
                            redisTemplate.opsForValue().increment("seckill_goods_stock_" + seckillId, 1);
                        }

                        //下单异常
                        TbSeckillGoods seckillGoods = seckillGoodsMapper.selectByPrimaryKey(seckillId);
                        seckillGoods.setTransactionStatus(2);
                        seckillGoodsMapper.updateByPrimaryKeySelective(seckillGoods);

                        return LocalTransactionState.ROLLBACK_MESSAGE;
                    }

                    //消息成功，提交
                    return LocalTransactionState.COMMIT_MESSAGE;
                }

                // 检测事务状态
                @Override
                public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {

                    String seckillId = null;
                    try {
                        String jsonStr = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
                        //把变成一个对象
                        Map<String,String> maps  = (Map<String, String>) JSON.toJSON(jsonStr);

                        //获取数据
                        seckillId = (String) maps.get("seckillId");

                        //根据状态，判断库存是否扣减成功
                        TbSeckillGoods seckillGoods = seckillGoodsMapper.selectByPrimaryKey(seckillId);

                        //获取事务状态
                        Integer transactionStatus = seckillGoods.getTransactionStatus();

                        //判断
                        if(transactionStatus==0){
                            return LocalTransactionState.UNKNOW;
                        }
                        if(transactionStatus == 1){
                            return LocalTransactionState.COMMIT_MESSAGE;
                        }
                        if(transactionStatus == 2){
                            return LocalTransactionState.ROLLBACK_MESSAGE;
                        }

                        return LocalTransactionState.ROLLBACK_MESSAGE;



                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }


                    return null;
                }
            });





            System.out.println("[Producer 已启动]");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public String send(String topic, String tags, String msg) {
        SendResult result = null;
        try {
            Message message = new Message(topic, tags, msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
            result = producer.send(message);
            System.out.println("[Producer] msgID(" + result.getMsgId() + ") " + result.getSendStatus());
        } catch (Exception e) {
            e.printStackTrace();
        }
        return "{\"MsgId\":\"" + result.getMsgId() + "\"}";
    }

    @PreDestroy
    public void shutDownProducer() {
        if (producer != null) {
            producer.shutdown();
        }
    }

    /**
     * @Description: 发送消息，同步数据库库存
     * @Author: hubin
     * @CreateDate: 2020/10/26 21:59
     * @UpdateUser: hubin
     * @UpdateDate: 2020/10/26 21:59
     * @UpdateRemark: 修改内容
     * @Version: 1.0
     */
    public boolean asncSendMsg(Long seckillId) {
        try {
            Message message = new Message("seckill_goods_asnc_stock", "increase", (seckillId+"").getBytes(RemotingHelper.DEFAULT_CHARSET));
            //发送消息
            producer.send(message);
        } catch (Exception e) {
            e.printStackTrace();
            //发送失败
            return false;
        }
        return true;
    }



    /**
     * @Description: 发送消息，使用事务型消息把所有的操作原子化
     * @Author: hubin
     * @CreateDate: 2020/10/26 21:59
     * @UpdateUser: hubin
     * @UpdateDate: 2020/10/26 21:59
     * @UpdateRemark: 修改内容
     * @Version: 1.0
     */
    public boolean asncSendTransactionMsg(Long seckillId,String userId) {
        try {

            Map<String,String> maps = new HashMap<>();
            maps.put("seckillId",seckillId+"");
            maps.put("userId",userId);

            //把对象转换为字符串
            String jsonStr = JSON.toJSONString(maps);

            Message message = new Message("seckill_goods_asnc_stock", "increase", jsonStr.getBytes(RemotingHelper.DEFAULT_CHARSET));
            //发送事务消息
            producer.sendMessageInTransaction(message,null);
        } catch (Exception e) {
            e.printStackTrace();
            //发送失败
            return false;
        }
        return true;
    }
}
